Cloud Composer で Storage Transfer Service の転送ジョブのステータスを監視したい
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
やりたいこと
Cloud Composer上で動作する Airflow DAG で、Storage Transfer Service の転送ジョブの完了を待機する仕組みを検討しています。
具体的には、Google Cloud Storage に転送されたファイルを BigQuery に取り込むために、転送ジョブが完了するのを待つ必要があります。
今回は、CloudDataTransferServiceJobStatusSensor
を使用して、DAG から転送ジョブの完了を待機できるか試してみましたのでご紹介します。
CloudDataTransferServiceJobStatusSensor
とは
CloudDataTransferServiceJobStatusSensor
は、指定した Storage Transfer Service の転送ジョブのステータスを監視するためのセンサーです。
このセンサーを用いることで、特定のジョブが完了したかどうかを確認し、その結果に応じて次のタスクを実行することができます。
ただし、実行中ジョブの完了を待機するのではなく、ジョブに属する少なくとも1つの操作が期待どおりのステータスになるまで待機するセンサーであることに注意が必要です
(私は当初この点を誤解しており、実行中ジョブの完了を待機するセンサーを想定しておりました)
環境作成
DAG を動かすために Cloud Composer 環境を作成します。
今回は最新バージョンの Composer 3 を選択しました。
Google Cloud コンソールの「Cloud Composer の環境作成」ページから、test-composer
という名前の環境を東京リージョンで作成しました。
設定はデフォルトのままとしていますが、サービスアカウントやネットワーク設定は必要に応じて調整してください。
ファイル準備
Storage Transfer Service で転送するファイルを準備します。
今回はcm_enokawa_work_source
という名前のバケットに、sales_
で始まる日別のCSV形式ファイルをアップロードしました。
このファイルが Storage Transfer Service のジョブにより転送され、最終的に BigQuery に取り込まれます。
DAGを作成する
まず、Google Cloud Storageに あるファイルを BigQuery に取り込むための DAG を作成します。
from airflow import DAG
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.utils.dates import days_ago
PROJECT_ID = '{プロジェクトID}'
DATASET_ID = 'work'
TABLE_ID = 'sales_data'
BUCKET_NAME = 'cm_enokawa_work'
# DAGの基本設定
default_args = {
'start_date': days_ago(1),
'retries': 0, # リトライなし
}
with DAG(
dag_id='gcs_transfer_to_bq',
default_args=default_args,
schedule_interval=None, # 手動で実行する
catchup=False
) as dag:
# GCS のファイルリストを取得
list_gcs_files = GCSListObjectsOperator(
task_id='list_gcs_files',
bucket=BUCKET_NAME,
prefix='sales_', # 'sales_' で始まるファイルを取得
)
# 取得したファイルリストを使って BigQuery にロード
gcs_to_bq = GCSToBigQueryOperator(
task_id='load_gcs_to_bq',
bucket=BUCKET_NAME, # GCS バケット名
source_objects=list_gcs_files.output, # 取得したファイルリストを使う
destination_project_dataset_table=f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}', # BigQuery のデータセットとテーブル
source_format='CSV', # ソースファイル形式
write_disposition='WRITE_TRUNCATE', # 既存データの上書き
skip_leading_rows=1 # CSV のヘッダー行をスキップ
)
list_gcs_files >> gcs_to_bq
この DAG では、Google Cloud Storage に保存された CSV ファイル (sales.csv
) を BigQuery のテーブル(sales_data
)にインポートしています。
次に、Storage Transfer Service の転送ジョブの完了を待機する DAG を作成します。
以下は、先ほどの DAG にCloudDataTransferServiceJobStatusSensor
を追加したものです。
from airflow import DAG
from airflow.providers.google.cloud.sensors.cloud_storage_transfer_service import CloudDataTransferServiceJobStatusSensor
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.utils.dates import days_ago
PROJECT_ID = '{プロジェクトID}'
DATASET_ID = 'work'
TABLE_ID = 'sales_data'
BUCKET_NAME = 'cm_enokawa_work'
TRANSFER_JOB_ID = 'transferJobs/5129259777506586129' # 転送ジョブのID
# DAGの基本設定
default_args = {
'start_date': days_ago(1),
'retries': 0, # リトライなし
}
with DAG(
dag_id='gcs_transfer_to_bq_with_status_sensor',
default_args=default_args,
schedule_interval=None, # 手動で実行する
catchup=False
) as dag:
# CloudDataTransferServiceJobStatusSensor を使って転送ジョブの完了を待機
transfer_job_status_task = CloudDataTransferServiceJobStatusSensor(
task_id='check_transfer_job_status',
job_name=TRANSFER_JOB_ID, # 転送ジョブのID
project_id=PROJECT_ID, # GCP プロジェクトID
expected_statuses=['SUCCESS'], # ジョブが成功ステータスを監視
poke_interval=60, # 60秒ごとにステータスをポーリング
timeout=60 * 10 # 10分以内に完了しなければタイムアウト
)
# GCS のファイルリストを取得
list_gcs_files = GCSListObjectsOperator(
task_id='list_gcs_files',
bucket=BUCKET_NAME,
prefix='sales_', # 'sales_' で始まるファイルを取得
)
# 取得したファイルリストを使って BigQuery にロード
gcs_to_bq = GCSToBigQueryOperator(
task_id='load_gcs_to_bq',
bucket=BUCKET_NAME, # GCS バケット名
source_objects=list_gcs_files.output, # 取得したファイルリストを使う
destination_project_dataset_table=f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}', # BigQuery のデータセットとテーブル
source_format='CSV', # ソースファイル形式
write_disposition='WRITE_TRUNCATE', # 既存データの上書き
skip_leading_rows=1 # CSV のヘッダー行をスキップ
)
transfer_job_status_task >> list_gcs_files >> gcs_to_bq
この DAG では、transfer_job_status_task
で Storage Transfer Service の転送ジョブがSUCCESS
になるまで待機します。
timeout
パタメータを指定して、10分以内 に完了しなければタイムアウトするようにしています。
監視対象となる転送ジョブは、転送ジョブIDで指定する必要があるため、事前に転送ジョブを作成してジョブIDを確認しておく必要があります。
転送ジョブを一時停止する
DAG を実行する前に、監視対象となる転送ジョブを実行中の状態にします。
具体的には、事前に作成した転送ジョブを開始後に一時停止します。
DAG を実行する
Cloud Composer の DAG 実行画面から、先ほど作成した DAG を手動で実行します。
まずは、CloudDataTransferServiceJobStatusSensor
を使用していないDAG(gcs_transfer_to_bq
)を実行してみます。
監視対象の転送ジョブは一時停止ステータスのままですが、DAG は正常終了しました。
DAG 側では転送ジョブのステータスを感知していないので、当然の結果となります。
次に、CloudDataTransferServiceJobStatusSensor
を使用した DAG(gcs_transfer_to_bq_with_status_sensor
)を実行します。
transfer_job_status_task
が実行中となり、転送ジョブの完了を待機します。
転送ジョブを再開する
転送ジョブを再開し、DAG がどのように動作するかを確認します。
一時停止中の転送ジョブを再開します。
転送ジョブが成功ステータスで完了しました。
DAG を確認すると、正常終了していました。
想定通り、監視対象の転送ジョブが完了するのを待機する動きになっているようです。
ガントチャートでも、transfer_job_status_task
が待機している様子が確認できます。
タイムアウトさせてみる
最後に、CloudDataTransferServiceJobStatusSensor
がタイムアウトしたときの動作を確認してみます。
先ほどと同じように、転送ジョブを一時停止させた状態で DAG を実行します。
ただし、今回はタイムアウトさせるために 10分以上 一時停止させた状態にしておきます。
DAG を動かしてみると、すぐに正常終了してしまいました。
transfer_job_status_task
(CloudDataTransferServiceJobStatusSensor
)が実行中の状態のまま待機する動きを想定していましたが、想定とは異なる結果となりました。
改めてドキュメントを確認すると、以下の記載がありました。
Waits for at least one operation belonging to the job to have the expected status.
どうやら、
現在実行中の転送ジョブが期待通りのステータスになるまで待機する
のではなく、
ジョブに属する少なくとも 1 つの操作が期待どおりのステータスになるまで待機する
という仕様のようです。
ということで、新たに転送ジョブを作成し、監視対象のジョブIDを作成した転送ジョブのものに差し替えて、同じように動作確認してみました。
すると、transfer_job_status_task
の実行開始後 10分 経過するとタイムアウトとなり、DAG が失敗しました。
ガントチャートでも、 10分間 待機している様子が確認できます。
さらに、転送ジョブのステータスを変えながら、CloudDataTransferServiceJobStatusSensor
の挙動を確認してみました。
以下は、転送ジョブのステータスとCloudDataTransferServiceJobStatusSensor
が待機するか否かの関係を整理した表です。
転送ジョブのステータス | 待機する / 待機しない |
---|---|
未実行 | 待機する |
実行中 | 待機する |
成功 | 待機しない |
失敗 | 待機する |
ドキュメントに記載の通り、成功ステータスが現れるまで待機する挙動になっていることが分かります。
まとめ
以上、Cloud Composer で Storage Transfer Service の転送ジョブのステータスを監視する方法を試してみました。
CloudDataTransferServiceJobStatusSensor
を活用することで、転送ジョブが成功するまで待機する仕組みを構築できることが確認できました。
ただし、このセンサーはジョブに属する少なくとも1つの操作が期待どおりのステータスになるまで待機する仕様であるため、DAG内で作成・実行した転送ジョブを監視するシーンに特に適しています。
具体的には、下記のオペレーターと組み合わせることで、より柔軟で効率的なデータ転送ワークフローを実現できます。
CloudDataTransferServiceCreateJobOperator
:新しい転送ジョブを作成するオペレーターCloudDataTransferServiceRunJobOperator
:既存の転送ジョブを実行するオペレーター
ちなみに、CloudDataTransferServiceJobStatusSensor
を使用したタスク(今回のケースではtransfer_job_status_task
)の詳細画面の [Cloud Storage Transfer Job] ボタンから対象の 転送ジョブのコンソール画面に遷移することができます。便利!